package com.peng.my_rocketmq.consumer;

import java.io.UnsupportedEncodingException;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
	protected final Logger log = LoggerFactory.getLogger(this.getClass());
    /**
     * 消费者实体对象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 通过构造函数 实例化对象
     */
    public Consumer() throws MQClientException{

   
			consumer = new DefaultMQPushConsumer("my-spring-rocketmq-syncTagSend2");//消费组
			consumer.setNamesrvAddr("10.9.254.42:9876");
			//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
			//订阅主题和 标签（ * 代表所有标签)下信息
			consumer.subscribe("peng-mq-spring-test", "syncTagSend6");
			// //注册消费的监听 并在此监听中消费信息，并返回消费的状态信息
			consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
			    // msgs中只收集同一个topic，同一个tag，并且key相同的message
			    // 会把不同的消息分别放置到不同的队列中
			    try {
			        for (Message msg : msgs) {

			            //消费者获取消息 这里只输出 不做后面逻辑处理
			            String body = new String(msg.getBody(), "utf-8");
			            log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
			        }
			        System.out.println(1/0);
			    } catch (Exception e) {
			        e.printStackTrace();
			       
			    }
			    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			});

			consumer.start();
	
        System.out.println("消费者 启动成功=======");
    }
}